{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Custom Workloads with Futures\n",
    "=============================\n",
    "\n",
    "<img src=\"http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg\" \n",
    "     width=\"30%\" \n",
    "     align=right\n",
    "     alt=\"Dask logo\">\n",
    "\n",
    "Dask futures provide fine-grained real-time execution for custom situations.  This is the foundation for other APIs like Dask arrays and dataframes."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Start Dask Client\n",
    "\n",
    "Unlike for arrays and dataframes, you need the Dask client to use the Futures interface.  Additionally the client provides a dashboard which \n",
    "is useful to gain insight on the computation.\n",
    "\n",
    "The link to the dashboard will become visible when you create the client below.  We recommend having it open on one side of your screen while using your notebook on the other side.  This can take some effort to arrange your windows, but seeing them both at the same is very useful when learning."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask.distributed import Client, progress\n",
    "client = Client(threads_per_worker=4, n_workers=1)\n",
    "client"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create simple functions\n",
    "\n",
    "These functions do simple operations like add two numbers together, but they sleep for a random amount of time to simulate real work."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "import random\n",
    "\n",
    "def inc(x):\n",
    "    time.sleep(random.random())\n",
    "    return x + 1\n",
    "\n",
    "def double(x):\n",
    "    time.sleep(random.random())\n",
    "    return 2 * x\n",
    "    \n",
    "def add(x, y):\n",
    "    time.sleep(random.random())\n",
    "    return x + y "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can run them locally"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "inc(1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Or we can submit them to run remotely with Dask.  This immediately returns a future that points to the ongoing computation, and eventually to the stored result."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "future = client.submit(inc, 1)  # returns immediately with pending future\n",
    "future"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If you wait a second, and then check on the future again, you'll see that it has finished."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "future  # scheduler and client talk constantly"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can block on the computation and gather the result with the `.result()` method."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "future.result()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Chain dependencies\n",
    "\n",
    "You can submit tasks on other futures.  This will create a dependency between the inputs and outputs.  Dask will track the execution of all tasks, ensuring that downstream tasks are run at the proper time and place and with the proper data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "x = client.submit(inc, 1)\n",
    "y = client.submit(double, 2)\n",
    "z = client.submit(add, x, y)\n",
    "z"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "z.result()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note that we never blocked on `x` or `y` nor did we ever have to move their data back to our notebook."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Submit many tasks\n",
    "\n",
    "So we've learned how to run Python functions remotely.  This becomes useful when we add two things:\n",
    "\n",
    "1.  We can submit thousands of tasks per second\n",
    "2.  Tasks can depend on each other by consuming futures as inputs\n",
    "\n",
    "We submit many tasks that depend on each other in a normal Python for loop"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "zs = []"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "for i in range(256):\n",
    "    x = client.submit(inc, i)     # x = inc(i)\n",
    "    y = client.submit(double, x)  # y = inc(x)\n",
    "    z = client.submit(add, x, y)  # z = inc(y)\n",
    "    zs.append(z)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "total = client.submit(sum, zs)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To make this go faster, add an additional workers with more cores \n",
    "\n",
    "(although we're still only working on our local machine, this is more practical when using an actual cluster)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "client.cluster.scale(10)  # ask for ten 4-thread workers"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Custom computation: Tree summation\n",
    "\n",
    "As an example of a non-trivial algorithm, consider the classic tree reduction.  We accomplish this with a nested for loop and a bit of normal Python logic.\n",
    "\n",
    "```\n",
    "finish           total             single output\n",
    "    ^          /        \\\n",
    "    |        c1          c2        neighbors merge\n",
    "    |       /  \\        /  \\\n",
    "    |     b1    b2    b3    b4     neighbors merge\n",
    "    ^    / \\   / \\   / \\   / \\\n",
    "start   a1 a2 a3 a4 a5 a6 a7 a8    many inputs\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "L = zs\n",
    "while len(L) > 1:\n",
    "    new_L = []\n",
    "    for i in range(0, len(L), 2):\n",
    "        future = client.submit(add, L[i], L[i + 1])  # add neighbors\n",
    "        new_L.append(future)\n",
    "    L = new_L                                   # swap old list for new"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If you're watching the [dashboard's status page](../proxy/8787/status) then you may want to note two things:\n",
    "\n",
    "1.  The red bars are for inter-worker communication.  They happen as different workers need to combine their intermediate values\n",
    "2.  There is lots of parallelism at the beginning but less towards the end as we reach the top of the tree where there is less work to do.\n",
    "\n",
    "Alternatively you may want to navigate to the [dashboard's graph page](../proxy/8787/graph) and then run the cell above again.  You will be able to see the task graph evolve during the computation."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Building a computation dynamically\n",
    "----------------------------------\n",
    "\n",
    "In the examples above we explicitly specify the task graph ahead of time.  We know for example that the first two futures in the list `L` will be added together.  \n",
    "\n",
    "Sometimes this isn't always best though, sometimes you want to dynamically define a computation as it is happening.  For example we might want to sum up these values based on whichever futures show up first, rather than the order in which they were placed in the list to start with.\n",
    "\n",
    "For this, we can use operations like [as_completed](http://dask.pydata.org/en/latest/futures.html#distributed.as_completed).\n",
    "\n",
    "We recommend watching the dashboard's graph page when running this computation.  You should see the graph construct itself during execution."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "del future, L, new_L, total  # clear out some old work"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask.distributed import as_completed\n",
    "\n",
    "zs = client.map(inc, zs)\n",
    "seq = as_completed(zs)\n",
    "\n",
    "while seq.count() > 2:  # at least two futures left\n",
    "    a = next(seq)\n",
    "    b = next(seq)\n",
    "    new = client.submit(add, a, b)  # add them together\n",
    "    seq.add(new)                    # add new future back into loop"
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
